feat: add executor pool support#687
Conversation
27790b7 to
6694105
Compare
wgtmac
left a comment
There was a problem hiding this comment.
Nice work. The abstraction is clean and the test coverage is solid. Since the default executor is nullopt, existing paths stay single-threaded, so the risk is well contained.
A couple of things before merge:
- The PR has no description. Worth adding the motivation and a short design note, ideally with the parallel/serial numbers that justify it.
- The
RetryRunnerchange from a runtime fluent API (OnlyRetryOn/StopRetryOn) to the compile-timeRetryRunner<Policy>is a breaking API change that's independent of executor support. Consider splitting it into its own PR so it gets reviewed on its own merits.
Left a few inline notes.
|
|
||
| ExecutorTask executor_task( | ||
| [promise = std::move(promise), task = std::move(task)]() mutable { | ||
| promise.set_value(std::move(task)()); |
There was a problem hiding this comment.
If a task throws instead of returning a Status, the promise is never set and the exception escapes on the worker thread, while future.get() sees a broken promise. The repo uses Result/Status rather than exceptions, so this is unlikely in our own code, but Executor is a public extension point and users may plug in pools where it matters. Wrapping the call in try/catch and turning the exception into a Status would be safer.
There was a problem hiding this comment.
Errors resulting from violating calling conventions are the user's responsibility, not the library's.
| if (!executor_.has_value()) { | ||
| return internal::RunTasksSingleThreaded(std::move(tasks_)); | ||
| } | ||
| return internal::RunTasksParallel(executor_->get(), std::move(tasks_)); |
There was a problem hiding this comment.
Run() blocks on future.get() from the calling thread. That's fine for the current sequential call sites, but PlanWith is now public API. If someone drives a TaskGroup from a worker thread of the same bounded executor, it can deadlock (pool saturated while waiting on a task queued behind it). Worth documenting that the driving thread must not be one of the executor's own workers.
There was a problem hiding this comment.
The issue of a thread pool's tasks spawning more tasks and then blocking themselves is a problem that I believe any thread pool with a thread limit will encounter, not just a problem with this particular PR.
wgtmac
left a comment
There was a problem hiding this comment.
A few more minor notes, non-blocking.
wgtmac
left a comment
There was a problem hiding this comment.
Another pass, this time on the interface design and extensibility.
The overall shape is good: one virtual Executor that engines implement, threaded through the builders via PlanWith. The Arrow adapter test is a nice proof that an external pool drops in with basically one line, so "bring your own threadpool" is well covered.
On future async directions (C++23 coroutines, P2300 std::execution): the model here is synchronous and blocking, TaskGroup::Run() fans out and blocks on std::future::get(), and the planning APIs return Result<...> directly. So this is a parallel-for primitive, not a step toward a sender/receiver pipeline. That's a reasonable scope for now, I'd just flag it explicitly so nobody expects this interface to extend into async later, it'll be a separate one. Details inline.
| virtual ~Executor() = default; | ||
|
|
||
| /// \brief Schedule a task for execution. | ||
| virtual Status Submit(ExecutorTask task) = 0; |
There was a problem hiding this comment.
This is a fire-and-forget execute-style primitive (closer to the abandoned P0443 executor.execute than to P2300's scheduler/sender). Completion is tracked outside, via the std::promise/future plumbing in RunTasksParallel. Fine for a blocking parallel-for, but it doesn't lay groundwork for coroutines or std::execution: those need the executor to hand back something awaitable/composable, and the planning APIs (PlanFiles() -> Result<...>) are synchronous anyway. Going async later would be a separate interface, not an extension of this one. Worth stating in the header that Executor is a parallel-dispatch primitive, not an async scheduler.
Separately: ExecutorTask being move-only is the right call and matches Arrow, but pools whose submit takes a copyable std::function will need std::move_only_function or a small shim to adapt.
| virtual Status Submit(ExecutorTask task) = 0; | ||
| }; | ||
|
|
||
| using OptionalExecutor = std::optional<std::reference_wrapper<Executor>>; |
There was a problem hiding this comment.
std::optional<std::reference_wrapper<Executor>> is a little awkward to use (executor_->get() at the call sites). A plain Executor* expresses "nullable, non-owning borrow" just as well and reads cleaner everywhere it's passed. Minor.
There was a problem hiding this comment.
Holding a pointer requires a check to determine whether it needs to be destructed, which is detrimental to future code maintainability.
| futures.reserve(tasks.size()); | ||
|
|
||
| std::vector<Error> errors; | ||
| for (auto& task : tasks) { |
There was a problem hiding this comment.
All tasks are submitted up front, each with its own promise/future. For a handful of manifests that's fine, but as a general primitive there's no concurrency bound and no fail-fast: N tasks always queue N and allocate N futures, and if one fails early the rest still run to completion. Probably out of scope for this PR, but worth a note if engines may push large fan-outs through here.
wgtmac
left a comment
There was a problem hiding this comment.
One more pass. The main thing I found is a documentation gap around a new concurrency contract that now leaks onto user-supplied callbacks.
Once an executor is set, the user's ManifestWriterFactory and the shared FileIO get called from multiple worker threads. The tests already account for this (the factories use an atomic counter plus a barrier), so the requirement is understood, it's just not written down anywhere a downstream engine would see it. Worth documenting on the public surface.
No description provided.